In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array_contains,explode, udf, get_json_object, expr,explode_outer, json_tuple
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType, FloatType
import json
from shapely.geometry import Point, MultiPolygon
In [2]:
spark = SparkSession.builder.appName("TaxiAnalyzer").getOrCreate()

Перед импортом файлов, они были предварительно залиты в hdfs с помощью команд:

./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/201902-citibike-tripdata.csv /HW2_tripdata ./hdfs dfs -copyFromLocal /home/ubuntu/Desktop/NYCTaxiZones.geojson /HW2_taxizones

Для более быстрого считывания определяю схему данных:

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
schema = StructType([
    StructField("tripduration", IntegerType(), True),
    StructField("starttime", TimestampType(), True),
    StructField("stoptime", TimestampType(), True),
    StructField("start station id", StringType(), True),
    StructField("start station name", StringType(), True),
    StructField("start station latitude", DoubleType(), True),
    StructField("start station longitude", DoubleType(), True),
    StructField("end station id", StringType(), True),
    StructField("end station name", StringType(), True),
    StructField("end station latitude", DoubleType(), True),
    StructField("end station longitude", DoubleType(), True),
    StructField("bikeid", IntegerType(), True),
    StructField("usertype", StringType(), True),
    StructField("birth year", IntegerType(), True),
    StructField("gender", IntegerType(), True)
])
In [4]:
trip_data  = spark.read.csv("/HW2_tripdata", header=True, schema=schema)

Удаляю ненужные столбцы

In [5]:
exclude = ["tripduration", "bikeid", "usertype", "birth year", "gender"]
trip_data = trip_data.drop(*exclude)

Задача 1

определите для каждой станции количество начала поездок и количество завершения поездок

In [6]:
start_count = trip_data.groupBy("start station id").count().withColumnRenamed("count", "start_count")
start_count.show()
+----------------+-----------+
|start station id|start_count|
+----------------+-----------+
|             296|       1795|
|            3606|        215|
|             467|       1276|
|            3414|        656|
|            3368|        745|
|            3517|        299|
|            3441|        263|
|            3526|        604|
|             447|       2512|
|            3312|        788|
|            3121|        291|
|            3249|        341|
|            3167|       2820|
|             307|       3028|
|            3491|        398|
|            3057|        480|
|             334|       2905|
|            3553|        798|
|            3241|        453|
|            3299|        256|
+----------------+-----------+
only showing top 20 rows

In [7]:
end_count = trip_data.groupBy("end station id").count().withColumnRenamed("count", "end_count")
end_count.show()
+--------------+---------+
|end station id|end_count|
+--------------+---------+
|           467|     1312|
|           296|     1825|
|          3414|      681|
|          3606|      224|
|          3441|      268|
|          3368|      769|
|          3517|      301|
|          3526|      606|
|           447|     2499|
|          3312|      791|
|          3249|      340|
|          3121|      295|
|          3167|     2693|
|          3491|      399|
|           307|     3006|
|          3057|      451|
|           334|     2912|
|          3241|      432|
|          3299|      262|
|          3553|      818|
+--------------+---------+
only showing top 20 rows

сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала

не смог адекватно спарсить geojson с мультиполигонами. Кажется, для этого бы идеально подошла библиотека geopandas. Поэтому спарсил в формате property и их geometry

In [8]:
nyc_geojson_content = spark.sparkContext.wholeTextFiles("/HW2_taxizones").values().collect()[0]
nyc_geojson = json.loads(nyc_geojson_content)
rdd = spark.sparkContext.parallelize(nyc_geojson['features'])
In [9]:
schema_zones = StructType([
    StructField("geometry", StructType([
        StructField("coordinates", ArrayType(ArrayType(ArrayType(ArrayType(DoubleType())))), True),

    ]), True),
    StructField("properties", StructType([
        StructField("shape_area", StringType(), True),
        StructField("objectid", StringType(), True),
        StructField("shape_leng", StringType(), True),
        StructField("location_id", StringType(), True),
        StructField("zone", StringType(), True),
        StructField("borough", StringType(), True),
    ]), True),
])
zones_df = spark.createDataFrame(rdd, schema=schema_zones)
In [10]:
zones_df.printSchema()
root
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |-- properties: struct (nullable = true)
 |    |-- shape_area: string (nullable = true)
 |    |-- objectid: string (nullable = true)
 |    |-- shape_leng: string (nullable = true)
 |    |-- location_id: string (nullable = true)
 |    |-- zone: string (nullable = true)
 |    |-- borough: string (nullable = true)

Для избежания сложных операций по передачи zones_df в udf использую broadcas на узлы кластера

In [11]:
zones_dict = {row['properties']['location_id']: row['geometry']['coordinates']
              for row in zones_df.collect()}
In [12]:
from pyspark.sql.functions import broadcast
zones_dict_bc = spark.sparkContext.broadcast(zones_dict)
In [13]:
def find_zone_udf(lat, lon):
    point = Point(lon, lat)
    for location_id, polygon_coordinates in zones_dict_bc.value.items():
        multi_polygon = MultiPolygon(polygon_coordinates)
        if point.within(multi_polygon):
            return location_id
    return "Не найдено подходящей зоны"
find_zone = udf(find_zone_udf, StringType())
In [14]:
trip_data  = spark.read.csv("/HW2_tripdata", header=True, schema=schema)
trip_data = trip_data.limit(10000)
In [15]:
trip_data = trip_data.withColumn("start_zone", find_zone("start station latitude", "start station longitude"))
trip_data = trip_data.withColumn("end_zone", find_zone("end station latitude", "end station longitude"))
In [16]:
start_zone_counts = trip_data.groupBy("start_zone").count()
end_zone_counts = trip_data.groupBy("end_zone").count()
In [17]:
start_zone_info = start_zone_counts.join(zones_df, start_zone_counts.start_zone == zones_df.properties.location_id, "inner")
end_zone_info = end_zone_counts.join(zones_df, end_zone_counts.end_zone == zones_df.properties.location_id, "inner")
In [18]:
start_zone_info = start_zone_info.select("start_zone", "count", "geometry.coordinates")
end_zone_info = end_zone_info.select("end_zone", "count", "geometry.coordinates")
In [19]:
spark.conf.set("spark.sql.broadcastTimeout", "3600")
In [29]:
start_zone_info = start_zone_info.orderBy(F.desc("count")).cache()
start_zone_info.show()
+----------+-----+--------------------+
|start_zone|count|         coordinates|
+----------+-----+--------------------+
|        79|  545|[[[[-73.983779909...|
|        68|  494|[[[[-74.002019377...|
|       170|  385|[[[[-73.972033310...|
|        48|  337|[[[[-73.991177381...|
|       186|  257|[[[[-73.990968327...|
|       100|  256|[[[[-73.987293770...|
|       234|  247|[[[[-73.989969363...|
|       232|  236|[[[[-73.976355251...|
|       224|  232|[[[[-73.975110553...|
|       148|  224|[[[[-73.984477316...|
|        90|  217|[[[[-73.996336894...|
|        97|  213|[[[[-73.969340976...|
|       231|  212|[[[[-74.009317603...|
|       113|  210|[[[[-73.991363880...|
|       246|  192|[[[[-74.004399762...|
|        43|  191|[[[[-73.972553522...|
|       137|  189|[[[[-73.972196416...|
|       181|  182|[[[[-73.975831322...|
|       142|  173|[[[[-73.981478987...|
|       249|  160|[[[[-74.002506423...|
+----------+-----+--------------------+
only showing top 20 rows

In [30]:
end_zone_info = end_zone_info.orderBy(F.desc("count")).cache()
end_zone_info.show()
+--------+-----+--------------------+
|end_zone|count|         coordinates|
+--------+-----+--------------------+
|     234|  468|[[[[-73.989969363...|
|     170|  391|[[[[-73.972033310...|
|      68|  352|[[[[-74.002019377...|
|     231|  349|[[[[-74.009317603...|
|     161|  304|[[[[-73.975343779...|
|      79|  299|[[[[-73.983779909...|
|     113|  280|[[[[-73.991363880...|
|     162|  279|[[[[-73.971217044...|
|     137|  258|[[[[-73.972196416...|
|     230|  249|[[[[-73.983624332...|
|     100|  248|[[[[-73.987293770...|
|      90|  242|[[[[-73.996336894...|
|     246|  230|[[[[-74.004399762...|
|      48|  227|[[[[-73.991177381...|
|      97|  221|[[[[-73.969340976...|
|     164|  218|[[[[-73.983625199...|
|      87|  203|[[[[-74.003722348...|
|     148|  180|[[[[-73.984477316...|
|     181|  178|[[[[-73.975831322...|
|     255|  172|[[[[-73.961760703...|
+--------+-----+--------------------+
only showing top 20 rows

In [22]:
#df = spark.read.json("/HW2_taxizones")
In [23]:
#df.printSchema()

К сожалению не смог корректно спарсить этот Json в spark dataframe, сохранив исходную структуру. Поэтому взял его локально для построения мультиполигонов.

In [31]:
with open('/home/ubuntu/Desktop/NYCTaxiZones.geojson') as f:
    nyc_geojson = json.load(f)

Картограмма количеств завершения для каждого квартала

In [32]:
import plotly.express as px

fig = px.choropleth_mapbox(end_zone_info, geojson=nyc_geojson, locations='end_zone', 
                           color='count',
                           mapbox_style="carto-positron",
                           zoom=10.3, 
                           center = {"lat": 40.75594159, "lon": -74.0021163},
                           opacity=0.5,
                           featureidkey="properties.location_id"  
                          )
fig.show()

Картограмма количеств начала для каждого квартала

In [33]:
fig = px.choropleth_mapbox(start_zone_info, geojson=nyc_geojson, locations='start_zone', 
                           color='count',
                           mapbox_style="carto-positron",
                           zoom=10.3, 
                           center = {"lat": 40.75594159, "lon": -74.0021163},
                           opacity=0.5,
                           featureidkey="properties.location_id"  
                          )
fig.show()

выведите максимальное, среднее значение, стандартное отклонение и медиан

In [34]:
trip_data  = spark.read.csv("/HW2_tripdata", header=True, schema=schema)
In [35]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import math

# Функция для расчета дистанции
def calculate_distance(lat1, lon1, lat2, lon2):
    # Радиус Земли
    R = 6371.0

    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)
    
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad

    a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    distance = R * c * 1000 
    return distance
distance_udf = F.udf(calculate_distance, DoubleType())
In [36]:
# фильтрация поездок с одинаковой начальной и конечной точкой
trip_data_filtered = trip_data.filter(
    (trip_data["start station latitude"] != trip_data["end station latitude"]) | 
    (trip_data["start station longitude"] != trip_data["end station longitude"])
)
In [37]:
# добавляю колонку с дистанцией поездки
trip_data_with_distance = trip_data_filtered.withColumn(
    "distance",
    distance_udf(
    F.col("start station latitude"),
    F.col("start station longitude"),
    F.col("end station latitude"),
    F.col("end station longitude")
    )
)
In [38]:
#trip_data_with_distance.show()
In [39]:
# считаю статистические данные
stats = trip_data_with_distance.select(
F.max("distance").alias("max_distance"),
F.mean("distance").alias("mean_distance"),
F.stddev("distance").alias("stddev_distance"),
F.expr("percentile(distance, 0.5)").alias("median_distance") # Медиана
).collect()
In [40]:
max_distance, mean_distance, stddev_distance, median_distance = stats[0]
print(f"Максимальное расстояние: {max_distance} м")
print(f"Среднее расстояние: {mean_distance} м")
print(f"Стандартное отклонение: {stddev_distance} м")
print(f"Медиана расстояний: {median_distance} м")
Максимальное расстояние: 15326.431486694262 м
Среднее расстояние: 1640.3728092017368 м
Стандартное отклонение: 1288.675655142281 м
Медиана расстояний: 1262.8745712441507 м

определите для каждой станции среднее количество начала поездок и количество завершения поездок:

  • в день
  • утром (06:00-11:59), днем (12:00-17:59), вечером (18:00-23:59), ночью (00:00-05:59)
  • в среду и в воскресенье по временным диапазонам (см. выше)
  • отобразите полученные данные для второго случая в виде тепловой временной карты (HeatMapWithTime)

в день

In [41]:
from pyspark.sql.functions import col, hour, dayofweek, date_format, to_date
from pyspark.sql.types import TimestampType
# конвертация форматов
trip_data = trip_data.withColumn("start_date", to_date("starttime")) \
                     .withColumn("stop_date", to_date("stoptime"))
In [42]:
# группировка и подсчет поездок по дням
start_counts = trip_data.groupBy("start station id", "start_date") \
                        .count() \
                        .withColumnRenamed("count", "start_count") \
                        .withColumnRenamed("start station id", "station_id")
stop_counts = trip_data.groupBy("end station id", "stop_date") \
                       .count() \
                       .withColumnRenamed("count", "stop_count") \
                       .withColumnRenamed("end station id", "station_id")
In [43]:
# соединение полученных подсчетов и заполнение нулями пропусков
daily_counts = start_counts.join(stop_counts, (start_counts.station_id == stop_counts.station_id) & \
                                                (start_counts.start_date == stop_counts.stop_date), "outer") \
                           .select(start_counts.station_id, "start_date", "start_count", "stop_count")
daily_counts = daily_counts.na.fill(0)
In [44]:
# вычисление средних значений и аггрегация
average_counts = daily_counts.groupBy("station_id") \
                             .agg(F.avg("start_count").alias("avg_daily_starts"), 
                                  F.avg("stop_count").alias("avg_daily_stops"))
In [45]:
average_counts.show(10)
+----------+------------------+------------------+
|station_id|  avg_daily_starts|   avg_daily_stops|
+----------+------------------+------------------+
|       296| 64.10714285714286| 65.07142857142857|
|      3414|23.428571428571427|24.321428571428573|
|      3606| 7.678571428571429|               8.0|
|       467| 45.57142857142857| 46.82142857142857|
|      3441| 9.392857142857142| 9.571428571428571|
|      3517|10.678571428571429|             10.75|
|      3368|26.607142857142858|27.464285714285715|
|      3526|21.571428571428573|21.642857142857142|
|      3249|12.178571428571429|12.142857142857142|
|       447| 89.71428571428571|             89.25|
+----------+------------------+------------------+
only showing top 10 rows

интервалы дня:

In [46]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
# Функция для определения времени суток
def get_time_of_day(hour):
    if 6 <= hour < 12:
        return 'Morning'
    elif 12 <= hour < 18:
        return 'Afternoon'
    elif 18 <= hour < 24:
        return 'Evening'
    else:
        return 'Night'
In [47]:
udf_time_of_day = F.udf(get_time_of_day, StringType())
In [48]:
# добавление столбцов с меткой интервала дня
trip_data = trip_data.withColumn('start_date', F.to_date('starttime')) \
                     .withColumn('stop_date', F.to_date('stoptime')) \
                     .withColumn('start_time_of_day', udf_time_of_day(F.hour('starttime'))) \
                     .withColumn('stop_time_of_day', udf_time_of_day(F.hour('stoptime'))) \
In [49]:
# группировка и подсчет поездок по интервалам
start_stations = trip_data.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data.groupBy("end station id", "stop_time_of_day").count()
In [50]:
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
                                .withColumnRenamed("count", "start_count") \
                                .withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
                                .withColumnRenamed("count", "end_count") \
                                .withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)
In [51]:
# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
    F.avg("start_count").alias("average_starts"),
    F.avg("end_count").alias("average_ends")
)
In [52]:
final_data = final_data.orderBy("station_id", "time_of_day")
In [53]:
final_data.show()
+----------+-----------+--------------+------------+
|station_id|time_of_day|average_starts|average_ends|
+----------+-----------+--------------+------------+
|       119|  Afternoon|          68.0|       126.0|
|       119|    Evening|          23.0|        43.0|
|       119|    Morning|          78.0|        47.0|
|       119|      Night|           2.0|         1.0|
|       120|  Afternoon|         183.0|       200.0|
|       120|    Evening|         112.0|       232.0|
|       120|    Morning|         283.0|        78.0|
|       120|      Night|          15.0|        14.0|
|       127|  Afternoon|        1060.0|      1123.0|
|       127|    Evening|         664.0|       720.0|
|       127|    Morning|        1188.0|      1136.0|
|       127|      Night|          28.0|        31.0|
|       128|  Afternoon|        1091.0|      1049.0|
|       128|    Evening|         744.0|       738.0|
|       128|    Morning|         716.0|       840.0|
|       128|      Night|          88.0|        44.0|
|       143|  Afternoon|         555.0|       386.0|
|       143|    Evening|         348.0|       237.0|
|       143|    Morning|         456.0|       741.0|
|       143|      Night|          28.0|        16.0|
+----------+-----------+--------------+------------+
only showing top 20 rows

In [54]:
# station_time_stats = final_data.groupBy("station_id", "time_of_day").agg(
#     F.avg("average_starts").alias("avg_start_trips"),
#     F.avg("average_ends").alias("avg_end_trips")
# ).orderBy("station_id", "time_of_day")
# station_time_stats.show()

prepare wor fizualize

In [55]:
# соединяю с исходным датасетом и добавляю оттуда координаты
final_data_with_coords = final_data.alias("final").join(
    trip_data.select("start station id", "start station latitude", "start station longitude").distinct().alias("trip"),
    col("final.station_id") == col("trip.start station id"), 
    "inner"
)
In [56]:
#final_data_with_coords.show()

visualize

In [57]:
final_data_pandas = final_data_with_coords.toPandas()
In [58]:
import folium
from folium.plugins import HeatMapWithTime, HeatMap
In [59]:
map = folium.Map(location=[final_data_pandas['start station latitude'].mean(), final_data_pandas['start station longitude'].mean()], zoom_start=13)
In [60]:
grouped_data = final_data_pandas.groupby('time_of_day')
In [61]:
data_by_time = []
for time, group in grouped_data:
    data = group[['start station latitude', 'start station longitude', 'average_starts']].values.tolist()
    data_by_time.append(data)
In [63]:
HeatMapWithTime(data_by_time).add_to(map)
Out[63]:
<folium.plugins.heat_map_withtime.HeatMapWithTime at 0x7f5fcc42fcc0>
In [64]:
map
Out[64]:
Make this Notebook Trusted to load map: File -> Trust Notebook

Для вычисления завершений поездок достаточно поменять параметр группировки в цикле по grouped_data, но из-за уже большого обьема ноутбука я этот шаг пропущу

в среду и в воскресенье по временным диапазонам

использовал dayofweek чтобы получить номер дня недели ( по умолчанию 1 - воскресенье https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.dayofweek.html)

In [65]:
trip_data = trip_data.withColumn('start_day_of_week', dayofweek('start_date')) \
                     .withColumn('stop_day_of_week', dayofweek('stop_date'))

Достаточно применить фильтрацию к исходному датасету, и прогнать по такому же алгоритму

In [66]:
# Фильтруем по датам начала и завершения поездок = среде (4) и воскресенью (1)
trip_data = trip_data.filter((trip_data.start_day_of_week == 4) | (trip_data.start_day_of_week == 1)) \
                     .filter((trip_data.stop_day_of_week == 4) | (trip_data.stop_day_of_week == 1))
In [67]:
# группировка и подсчет поездок по интервалам
start_stations = trip_data.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
                                .withColumnRenamed("count", "start_count") \
                                .withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
                                .withColumnRenamed("count", "end_count") \
                                .withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)

# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
    F.avg("start_count").alias("average_starts"),
    F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
+----------+-----------+--------------+------------+
|station_id|time_of_day|average_starts|average_ends|
+----------+-----------+--------------+------------+
|       119|  Afternoon|          17.0|        39.0|
|       119|    Evening|           5.0|         8.0|
|       119|    Morning|          25.0|        13.0|
|       119|      Night|           2.0|         1.0|
|       120|  Afternoon|          61.0|        65.0|
|       120|    Evening|          29.0|        52.0|
|       120|    Morning|          79.0|        25.0|
|       120|      Night|           7.0|         7.0|
|       127|  Afternoon|         286.0|       288.0|
|       127|    Evening|         123.0|       134.0|
|       127|    Morning|         298.0|       305.0|
|       127|      Night|           9.0|        10.0|
|       128|  Afternoon|         274.0|       266.0|
|       128|    Evening|         132.0|       127.0|
|       128|    Morning|         167.0|       202.0|
|       128|      Night|          28.0|        11.0|
|       143|  Afternoon|         153.0|       101.0|
|       143|    Evening|          73.0|        37.0|
|       143|    Morning|         119.0|       176.0|
|       143|      Night|           6.0|         4.0|
+----------+-----------+--------------+------------+
only showing top 20 rows

по тз стоит условие "И". Но интереснее посмотреть и сравнить будний и выходной день

In [68]:
trip_data_wednesday = trip_data.filter((trip_data.start_day_of_week == 4)) \
                     .filter((trip_data.stop_day_of_week == 4))
In [69]:
# группировка и подсчет поездок по интервалам
start_stations = trip_data_wednesday.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data_wednesday.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
                                .withColumnRenamed("count", "start_count") \
                                .withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
                                .withColumnRenamed("count", "end_count") \
                                .withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)

# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
    F.avg("start_count").alias("average_starts"),
    F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
+----------+-----------+--------------+------------+
|station_id|time_of_day|average_starts|average_ends|
+----------+-----------+--------------+------------+
|       119|  Afternoon|          10.0|        27.0|
|       119|    Evening|           3.0|         7.0|
|       119|    Morning|          22.0|        13.0|
|       119|      Night|           1.0|         1.0|
|       120|  Afternoon|          12.0|        14.0|
|       120|    Evening|          10.0|        21.0|
|       120|    Morning|          55.0|        11.0|
|       120|      Night|           1.0|         0.0|
|       127|  Afternoon|         130.0|       129.0|
|       127|    Evening|          82.0|        89.0|
|       127|    Morning|         243.0|       241.0|
|       127|      Night|           3.0|         3.0|
|       128|  Afternoon|         115.0|        96.0|
|       128|    Evening|          76.0|        74.0|
|       128|    Morning|         108.0|       146.0|
|       128|      Night|          15.0|         2.0|
|       143|  Afternoon|          73.0|        35.0|
|       143|    Evening|          55.0|        20.0|
|       143|    Morning|          85.0|       136.0|
|       143|      Night|           4.0|         2.0|
+----------+-----------+--------------+------------+
only showing top 20 rows

In [70]:
trip_data_sunday = trip_data.filter((trip_data.start_day_of_week == 1)) \
                     .filter((trip_data.stop_day_of_week == 1))
In [71]:
# группировка и подсчет поездок по интервалам
start_stations = trip_data_sunday.groupBy("start station id", "start_time_of_day").count()
end_stations = trip_data_sunday.groupBy("end station id", "stop_time_of_day").count()
# подготовка данных,слияние, заполнение пропусков
start_stations = start_stations.withColumnRenamed("start station id", "station_id") \
                                .withColumnRenamed("count", "start_count") \
                                .withColumnRenamed("start_time_of_day", "time_of_day")
end_stations = end_stations.withColumnRenamed("end station id", "station_id") \
                                .withColumnRenamed("count", "end_count") \
                                .withColumnRenamed("stop_time_of_day", "time_of_day")
combined_data = start_stations.join(end_stations, ["station_id", "time_of_day"], "outer")
combined_data = combined_data.na.fill(0)

# вычисление средних значений и аггрегация
final_data = combined_data.groupBy("station_id", "time_of_day").agg(
    F.avg("start_count").alias("average_starts"),
    F.avg("end_count").alias("average_ends")
)
final_data = final_data.orderBy("station_id", "time_of_day")
final_data.show()
+----------+-----------+--------------+------------+
|station_id|time_of_day|average_starts|average_ends|
+----------+-----------+--------------+------------+
|       119|  Afternoon|           7.0|        12.0|
|       119|    Evening|           2.0|         1.0|
|       119|    Morning|           3.0|         0.0|
|       119|      Night|           1.0|         0.0|
|       120|  Afternoon|          49.0|        51.0|
|       120|    Evening|          19.0|        31.0|
|       120|    Morning|          24.0|        14.0|
|       120|      Night|           6.0|         7.0|
|       127|  Afternoon|         156.0|       159.0|
|       127|    Evening|          41.0|        45.0|
|       127|    Morning|          55.0|        64.0|
|       127|      Night|           6.0|         7.0|
|       128|  Afternoon|         159.0|       170.0|
|       128|    Evening|          56.0|        53.0|
|       128|    Morning|          59.0|        56.0|
|       128|      Night|          13.0|         9.0|
|       143|  Afternoon|          80.0|        66.0|
|       143|    Evening|          18.0|        17.0|
|       143|    Morning|          34.0|        40.0|
|       143|      Night|           2.0|         2.0|
+----------+-----------+--------------+------------+
only showing top 20 rows